Skip to content

MINOR: Cleanup Connect Module (1/n) #19869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
Expand All @@ -44,31 +42,31 @@ public class ConnectSchema implements Schema {
private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();

static {
SCHEMA_TYPE_CLASSES.put(Type.INT8, Collections.singletonList(Byte.class));
SCHEMA_TYPE_CLASSES.put(Type.INT16, Collections.singletonList(Short.class));
SCHEMA_TYPE_CLASSES.put(Type.INT32, Collections.singletonList(Integer.class));
SCHEMA_TYPE_CLASSES.put(Type.INT64, Collections.singletonList(Long.class));
SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Collections.singletonList(Float.class));
SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Collections.singletonList(Double.class));
SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Collections.singletonList(Boolean.class));
SCHEMA_TYPE_CLASSES.put(Type.STRING, Collections.singletonList(String.class));
SCHEMA_TYPE_CLASSES.put(Type.INT8, List.of(Byte.class));
SCHEMA_TYPE_CLASSES.put(Type.INT16, List.of(Short.class));
SCHEMA_TYPE_CLASSES.put(Type.INT32, List.of(Integer.class));
SCHEMA_TYPE_CLASSES.put(Type.INT64, List.of(Long.class));
SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, List.of(Float.class));
SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, List.of(Double.class));
SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, List.of(Boolean.class));
SCHEMA_TYPE_CLASSES.put(Type.STRING, List.of(String.class));
// Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and
// hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause
// those methods to fail, so ByteBuffers are recommended
SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList(byte[].class, ByteBuffer.class));
SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Collections.singletonList(List.class));
SCHEMA_TYPE_CLASSES.put(Type.MAP, Collections.singletonList(Map.class));
SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Collections.singletonList(Struct.class));
SCHEMA_TYPE_CLASSES.put(Type.BYTES, List.of(byte[].class, ByteBuffer.class));
SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.of(List.class));
SCHEMA_TYPE_CLASSES.put(Type.MAP, List.of(Map.class));
SCHEMA_TYPE_CLASSES.put(Type.STRUCT, List.of(Struct.class));

for (Map.Entry<Type, List<Class<?>>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
for (Class<?> schemaClass : schemaClasses.getValue())
JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
}

LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Collections.singletonList(BigDecimal.class));
LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Collections.singletonList(java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Collections.singletonList(java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Collections.singletonList(java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, List.of(BigDecimal.class));
LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, List.of(java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, List.of(java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, List.of(java.util.Date.class));
// We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for
// schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so
// they should not be used without schemas.
Expand Down Expand Up @@ -110,7 +108,7 @@ public ConnectSchema(Type type, boolean optional, Object defaultValue, String na
this.parameters = parameters;

if (this.type == Type.STRUCT) {
this.fields = fields == null ? Collections.emptyList() : fields;
this.fields = fields == null ? List.of() : fields;
this.fieldsByName = new HashMap<>(this.fields.size());
for (Field field : this.fields)
fieldsByName.put(field.name(), field);
Expand Down Expand Up @@ -285,7 +283,7 @@ private static Schema assertSchemaNotNull(Schema schema, String location) {
private static List<Class<?>> expectedClassesFor(Schema schema) {
List<Class<?>> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), Collections.emptyList());
expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), List.of());
return expectedClasses;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,10 @@ public String getName() {
}

public boolean isPrimitive() {
switch (this) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case STRING:
case BYTES:
return true;
}
return false;
return switch (this) {
case INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES -> true;
default -> false;
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,13 @@ public static Object project(Schema source, Object record, Schema target) throws
}

private static Object projectRequiredSchema(Schema source, Object record, Schema target) throws SchemaProjectorException {
switch (target.type()) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case BYTES:
case STRING:
return projectPrimitive(source, record, target);
case STRUCT:
return projectStruct(source, (Struct) record, target);
case ARRAY:
return projectArray(source, record, target);
case MAP:
return projectMap(source, record, target);
}
return null;
return switch (target.type()) {
case INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, BYTES, STRING ->
projectPrimitive(source, record, target);
case STRUCT -> projectStruct(source, (Struct) record, target);
case ARRAY -> projectArray(source, record, target);
case MAP -> projectMap(source, record, target);
};
}

private static Object projectStruct(Schema source, Struct sourceStruct, Schema target) throws SchemaProjectorException {
Expand Down Expand Up @@ -161,28 +149,15 @@ private static Object projectPrimitive(Schema source, Object record, Schema targ
assert target.type().isPrimitive();
Object result;
if (isPromotable(source.type(), target.type()) && record instanceof Number numberRecord) {
switch (target.type()) {
case INT8:
result = numberRecord.byteValue();
break;
case INT16:
result = numberRecord.shortValue();
break;
case INT32:
result = numberRecord.intValue();
break;
case INT64:
result = numberRecord.longValue();
break;
case FLOAT32:
result = numberRecord.floatValue();
break;
case FLOAT64:
result = numberRecord.doubleValue();
break;
default:
throw new SchemaProjectorException("Not promotable type.");
}
result = switch (target.type()) {
case INT8 -> numberRecord.byteValue();
case INT16 -> numberRecord.shortValue();
case INT32 -> numberRecord.intValue();
case INT64 -> numberRecord.longValue();
case FLOAT32 -> numberRecord.floatValue();
case FLOAT64 -> numberRecord.doubleValue();
default -> throw new SchemaProjectorException("Not promotable type.");
};
} else {
result = record;
}
Expand Down
69 changes: 24 additions & 45 deletions connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,33 +430,20 @@ protected static Object convertTo(Schema toSchema, Schema fromSchema, Object val
}
throw new DataException("Unable to convert a null value to a schema that requires a value");
}
switch (toSchema.type()) {
case BYTES:
return convertMaybeLogicalBytes(toSchema, value);
case STRING:
return convertToString(fromSchema, value);
case BOOLEAN:
return convertToBoolean(fromSchema, value);
case INT8:
return convertToByte(fromSchema, value);
case INT16:
return convertToShort(fromSchema, value);
case INT32:
return convertMaybeLogicalInteger(toSchema, fromSchema, value);
case INT64:
return convertMaybeLogicalLong(toSchema, fromSchema, value);
case FLOAT32:
return convertToFloat(fromSchema, value);
case FLOAT64:
return convertToDouble(fromSchema, value);
case ARRAY:
return convertToArray(toSchema, value);
case MAP:
return convertToMapInternal(toSchema, value);
case STRUCT:
return convertToStructInternal(toSchema, value);
}
throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
return switch (toSchema.type()) {
case BYTES -> convertMaybeLogicalBytes(toSchema, value);
case STRING -> convertToString(fromSchema, value);
case BOOLEAN -> convertToBoolean(fromSchema, value);
case INT8 -> convertToByte(fromSchema, value);
case INT16 -> convertToShort(fromSchema, value);
case INT32 -> convertMaybeLogicalInteger(toSchema, fromSchema, value);
case INT64 -> convertMaybeLogicalLong(toSchema, fromSchema, value);
case FLOAT32 -> convertToFloat(fromSchema, value);
case FLOAT64 -> convertToDouble(fromSchema, value);
case ARRAY -> convertToArray(toSchema, value);
case MAP -> convertToMapInternal(toSchema, value);
case STRUCT -> convertToStructInternal(toSchema, value);
};
}

private static Serializable convertMaybeLogicalBytes(Schema toSchema, Object value) {
Expand Down Expand Up @@ -1144,21 +1131,15 @@ private static Schema mergeSchemas(Schema previous, Schema newSchema) {
Type previousType = previous.type();
Type newType = newSchema.type();
if (previousType != newType) {
switch (previous.type()) {
case INT8:
return commonSchemaForInt8(newSchema, newType);
case INT16:
return commonSchemaForInt16(previous, newSchema, newType);
case INT32:
return commonSchemaForInt32(previous, newSchema, newType);
case INT64:
return commonSchemaForInt64(previous, newSchema, newType);
case FLOAT32:
return commonSchemaForFloat32(previous, newSchema, newType);
case FLOAT64:
return commonSchemaForFloat64(previous, newType);
}
return null;
return switch (previous.type()) {
case INT8 -> commonSchemaForInt8(newSchema, newType);
case INT16 -> commonSchemaForInt16(previous, newSchema, newType);
case INT32 -> commonSchemaForInt32(previous, newSchema, newType);
case INT64 -> commonSchemaForInt64(previous, newSchema, newType);
case FLOAT32 -> commonSchemaForFloat32(previous, newSchema, newType);
case FLOAT64 -> commonSchemaForFloat64(previous, newType);
default -> null;
};
}
if (previous.isOptional() == newSchema.isOptional()) {
// Use the optional one
Expand Down Expand Up @@ -1273,10 +1254,8 @@ public boolean canDetect(Object value) {
}
if (knownType == null) {
knownType = schema.type();
} else if (knownType != schema.type()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            if (knownType == null) knownType = schema.type();
            return knownType == schema.type();

How about using this style?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I will push an update.
Thanks!

return false;
}
return true;
return knownType == schema.type();
}

public Schema schema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.storage;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
Expand All @@ -37,7 +36,7 @@ public enum ConverterType {
for (ConverterType type : types) {
nameToType.put(type.name, type);
}
NAME_TO_TYPE = Collections.unmodifiableMap(nameToType);
NAME_TO_TYPE = Map.copyOf(nameToType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -33,15 +32,15 @@ public class ConnectorReconfigurationTest {
@Test
public void testDefaultReconfigure() {
TestConnector conn = new TestConnector(false);
conn.reconfigure(Collections.emptyMap());
assertEquals(conn.stopOrder, 0);
assertEquals(conn.configureOrder, 1);
conn.reconfigure(Map.of());
assertEquals(0, conn.stopOrder);
assertEquals(1, conn.configureOrder);
}

@Test
public void testReconfigureStopException() {
TestConnector conn = new TestConnector(true);
assertThrows(ConnectException.class, () -> conn.reconfigure(Collections.emptyMap()));
assertThrows(ConnectException.class, () -> conn.reconfigure(Map.of()));
}

private static class TestConnector extends Connector {
Expand Down
Loading